
from threading import Thread, RLock, Lock
import threading
import string
import sys
import random 
import time
import os
import sys

from ma.commons.core.taskinfo import *
from ma.commons.core.constants import * 
from ma.commons.core.nodetrackerstatus import *
from ma.utils.timerclass import *

from .taskscheduler import *
from .hdfsinteracter import *
from ma.commons.core.newmaptaskaction import *
from ma.commons.core.maptipinfo import *
from ma.commons.core.reducetipinfo import *
from ma.mrimprov.commons.maptip import *
from ma.mrimprov.commons.reducetip import *
from ma.commons.core.expireunresponsivetasks import *
from ma.commons.core.completedtaskinfo import *
from ma.commons.core.failedtaskinfo import *
import ma.commons.core.outputserver as outputserver

import logging
import logging.config
import ma.const
import ma.log
from ma.fs.dfs._hdfs import HDFS
from ma.stats.statster import Statster



class NodeTracker(Thread):
    
    """This class will manage the functions of a single node on the cluster, 
    all its communication with the JobsTracker will be initiated by this node. 
    """
    
    def startNewMapTask(self, newmaptaskaction):
        
        """
        this func is called if the JobTracker responds to heartbeat() 
        directive to launch a new task; the task is added to the running
        tasks dict and started by the task runner as a seperate thread
        """
        
        #a new mapTIPInfo object is created
        maptip_info = MapTIPInfo(newmaptaskaction.task_id, 
                                 newmaptaskaction.job_id,
                                 newmaptaskaction.input_data,
                                 newmaptaskaction.struct_id)
       
        #logging the map task initated
        self.__log.info("A Map task %d initiated with Job Id %d", maptip_info.task_id, maptip_info.job_id)
        
        # a new MapTIP object is created to be added to the running tasks dict                                 
        maptip = MapTIP(self, maptip_info, newmaptaskaction.struct_id)
        self.threads.append(maptip)
        
        #add the MapTIP to the running tasks dict
        if(maptip_info.job_id in self.running_map_tasks):
            (self.running_map_tasks[maptip_info.job_id])[maptip_info.task_id] = [maptip, RLock()]
        else:
            self.running_map_tasks[maptip_info.job_id]= {maptip_info.task_id: [maptip,RLock()]}
            self.jobs.append(maptip_info.job_id)
            self.completed_maps[int(maptip_info.job_id)] =[]
            
        ping_id = maptip_info.returnTaskID()
        
        #add the MapTIP to the tasks_last_ping dict
        if(maptip_info.job_id in self.tasks_last_ping):
            (self.tasks_last_ping [maptip_info.job_id])[ping_id]= [maptip_info.map_or_reduce, time.time(), RLock()]
        else:
            self.tasks_last_ping[maptip_info.job_id]= {ping_id: [maptip_info.map_or_reduce, time.time(), RLock()]}
                        
        #the launch task func called of the TaskInProgress object
        (self.running_map_tasks[maptip_info.job_id])[maptip_info.task_id][0].start()
        
        #increment the current task capacity on this node_tracker
        self.task_vacancy -= 1
    
    
    def __init__(self, tracker_id, forced_job_id=None):
        
        """all ctor work will be done in func initialize(). 
        The NodeTracker houses three task dictionaries below, each is meant to have a key 
        pointing to another dictionary, the key being the job_id and the value being 
        another dictionary with TIP_id as key and the TaskinProgress object as value 
        """
        #initializing the logger
        self.__log = ma.log.get_logger('ma.mrimprov')
        self.__log.info('Starting up the MR+ Nodetracker')
        
        self.threads = []
        
        self.jobs = []
        
        # This HDFS reference is for fetching or writing input, output or any
        # intermediary files (Note: not for interacting with the DFS flags                    
        host = ma.const.XmlData.get_str_data(ma.const.xml_hdfs_host)
        port = ma.const.XmlData.get_int_data(ma.const.xml_hdfs_port)
        self.hdfs  = HDFS(host, port)
        
        self.hdfs_interactor = HDFSInteracter(self, forced_job_id)
        
        # start output server (data shuffler)
        self.output_server = outputserver.OutputServer(self)
        
        Thread.__init__(self)
        
        # a dict of running tasks task_id -> TIP. This and the subsequent three 
        #sets of dicts are a dict embedded in a dictionary. First is having a 
        #job_id as key and value is a dict with Task_id as key and either a 
        #MapTIP or ReduceTIP as values 
        self.running_map_tasks = {}
        self.running_reduce_tasks = {}
        
        # a dict of tasks task_id -> TIP that have completed.
        self.complete_map_tasks = {}
        self.complete_reduce_tasks = {}
        
        # a dict of tasks task_id -> TIP that have failed or failed to respond
        self.failed_map_tasks = {}
        self.failed_reduce_tasks = {}
                
        self.threads = []
        
        #if a task does not send a health ping for this interval it must have 
        #expired
        self.taskexpiry_interval = ma.const.XmlData.get_float_data(ma.const.xml_nt_task_expiry_interval)
        
        #a NodeTrackerStatus object built up to be sent to the JobsTracker along
        #with every heartbeat
        self.tacker_status = NodeTrackerStatus(tracker_id)
        
        #the internal between heartbeats; sent to the JobTracker
        self.heartbeat_interval = ma.const.XmlData.get_float_data(ma.const.xml_nt_heartbeat_interval)
        
        #a dict that keeps track of health ping from tasks running at this NodeTracker
        #this too is a dict within a dict just as running tasks dict
        self.tasks_last_ping = {}
        
        self.timer = TimerClass(ma.const.XmlData.get_float_data(ma.const.xml_check_on_timers_interval))
        self.timer.start()
        self.threads.append(self.timer)
        
        #the current task capacity on this node_tracker
        self.task_vacancy = ma.const.XmlData.get_int_data(ma.const.xml_max_no_tasks)
        
        # for logging stats
        self.statster = Statster(self, forced_job_id)
        
        self.initialize()
                
        # a list of info objects that is built up from one heartbeat to the next
        # and passed along every heartbeat call to the HDFSInteracter
        self.info_objects_list = []
        
        # an actions object list that is sent from the HDFSInteractor 
        #dictating the actions to be taken by the NT in response to its heartbeat call 
        self.actions = []
        
        # self.expiredeadtasks = ExpireUnresponsiveTasks(self, ma.const.XmlData.get_float_data(ma.const.xml_check_task_expiry_interval))
        
        #this is a dict that houses completed maps per jobs job_id -> [list_of_maps completed] 
        self.completed_maps = {0:[]}
        
        # lock for accessing heartbeat
        self.__lock = Lock()
              
                          
    def initialize(self):
        
        """all ctor work will be done in this func so that the NodeTracker may be 
        re-initializes whenever it comes up again after failure 
        """
        
        #reinitialize dict of running tasks task_id -> TIP
        self.running_map_tasks.clear()
        self.running_reduce_tasks.clear()
        
        #reinitialize dict of complete tasks task_id -> TIP
        self.complete_map_tasks.clear()
        self.complete_reduce_tasks.clear()
        
        #reinitialize dict of failed tasks task_id -> TIP
        self.failed_map_tasks.clear()
        self.failed_reduce_tasks.clear()
        
        #reinitialize dict of tasks ping times 
        self.tasks_last_ping.clear()
        
        #the current task capacity on this node_tracker
        self.task_vacancy = ma.const.XmlData.get_int_data(ma.const.xml_max_no_tasks)
        
        self.__log.info('(Re)Initialized MR Nodetracker')
        
                                   
    def run(self):        
        """this func is the main service loop and will run forever 
        """
        self.__log.info('Starting up the MR+ NT\'s Heartbeat')
            
        #the timer for heart beat initialized
        self.hrtbeattimer = self.timer.addTimer(self.heartbeat_interval, self.heartbeat,[])
        
        print('Take out the while loop in the Run function')
        
        while True:
            time.sleep(5)
        
            
    def heartbeat(self):
        """this func will send a TaskInfo objects list to the HDFSCommunicator
        periodically and retrieve an actions list from this call  
        """
                
        # TODO: CODE for talking to HDFS according to info objects in NTStatus object
        #1) check task capacity on NT and pick up new map or reduce tasks from the
        #HDFS after consulting the NT scheduler
        #2)Mark failed tasks as failed
        #3) mark completed tasks as complete
        #4) mark shuffle completed on completed tasks as output of a map or reduce 
        #task is read by another task
        with self.__lock:
            self.actions = self.hdfs_interactor.heartbeatFromNT(self.info_objects_list + [], self.task_vacancy)
            
            self.__log.debug('Actions returned from heartbeat: %s', str(self.actions))
            
            
            #TO DO CODE work out a series of actions for the NT relevant to the info objects 
            self.processHeartbeatResponse(self.actions)
           
            #reinitialize the info objects list for the next heartbeat call
            self.info_objects_list = []
        
            
    def processHeartbeatResponse(self, actions):
        
        """This function will iterate over the actions list sent by the JobsTracker
        """
        
        for action in actions:
            
            action_type = action.action_type
            
            #if Action object is NewTaskAction
            if action_type == NEWMAPTASKACTION_TYPE:
            
                #launch the new map task
                self.startNewMapTask(action)
                
            #if Action object is NewReduceTaskAction
            elif action_type == NEWREDUCETASKACTION_TYPE:
                #create a new ReduceTIP 
                self.startNewReduceTask(action)
            
            # TODO: Kill task now requires a map or reduce parameter
            #if Action object is KillTaskAction
            elif action_type == KILLTASKACTION_TYPE:
                self.killTask(action.job_id, action.task_id)
            
            #if Action object is KillJobAction
            elif action_type == KILLJOBACTION_TYPE:
                self.killJob(action.job_id)
            
            #if Action object is ReinitNodeTrackerAction
            elif action_type == REINITNODETRACKERACTION_TYPE:
                self.initialize()
            
            #if Action object is DeadNodeTrackerAction
            elif action_type == DEADNODETRACKERACTION_TYPE:
                self.updateIPTranslationEntry(action.dead_node_id)
            
            #if Action object is CompleteJobAction
            elif action_type == COMPLETEJOBSACTION_TYPE:
                self.doHouskeepingForCompleteJobs(action.jobs_list)
        
            #if Action object is NewJobAction
            elif action_type == REFRESHJOBSACTION_TYPE:
                #add the job to the schedular's job list 
                self.task_scheduler.refreshJobsToDict(action.jobs_dict)
                
            
    def getCompleteTaskOutputPath(self, job_id, map_or_reduce, task_id, dest_task_id):      
        """this func will return the output path for a completed task's output
        if this NT computed that task id to completion. If not, then return 
        None
        """
        
        if map_or_reduce == MAP:
            # if map
            if job_id in self.complete_map_tasks and task_id in self.complete_map_tasks[job_id]:
                # get the standard output filename
                self.output_dest_dir = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_output_temp_dir, job_id)
                output_filepath = self.output_dest_dir + os.sep + \
                    ma.const.JobsXmlData.get_str_data(ma.const.xml_map_output_filename, job_id, task_id)
                    
                # double check if the file exists
                if os.path.isfile(output_filepath):
                    return output_filepath
                else:
                    self.__log.error('Although the map task %d is complete but its output is not present at the standard path: %s', task_id, output_filepath)
                    raise IOError('Missing output file for job id %d, map task %d', job_id, task_id)
        else:
            # if reduce
            if job_id in self.complete_reduce_tasks and task_id in self.complete_reduce_tasks[job_id]:
                # get the standard output filename
                self.output_dest_dir = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_output_temp_dir, job_id)
                output_filepath = self.output_dest_dir + os.sep + \
                    ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_output_filename, job_id, task_id)
                    
                # double check if the file exists
                if os.path.isfile(output_filepath):
                    return output_filepath
                else:
                    self.__log.error('Although the reduce task %d is complete but its output is not present at the standard path: %s', task_id, output_filepath)
                    raise IOError('Missing output file for job id %d, reduce task %d', job_id, task_id)
                
        return None
        
  
    def doHouskeepingForCompleteJobs(self, jobs_list):
        """this function will do housekeeping for complete jobs
        """
        self.__log.error('Need to implement housekeeping for completed job')
        raise NotImplementedError('Complete Jobs housekeeping function yet to be implemented')

    
    def acquireTIPref(self, dict, job_id, task_id):
        
        """This function will return a TIP reference if given the dictionary and pointed 
        to the TIP... 
        """
        
        if job_id in dict and task_id in (dict[job_id]):
            object, lock = (dict[job_id])[task_id]
            return object
        else:
            return None
        
         
    def acquireTIPLock(self, dict, job_id, task_id):
        """this function will acquire the lock for a thread whenever called upon
        """    
        if job_id in dict and task_id in (dict[job_id]):
            object, lock = (dict[job_id])[task_id]
            lock.acquire(1)
            return True
        else:
            return False      
    
    
    def releaseTIPLock(self, dict, job_id, task_id):
        """this function will release the lock for a thread whenever called upon
        """
        if job_id in dict and task_id in (dict[job_id]):
            object, lock = (dict[job_id])[task_id]
            lock.release()
            return True
        else:
            return False
      
        
    def updateIPTranslationEntry(self, dead_node_id):
        """This function will update the IP Translation table in case of a 
        NodeTracker failure 
        """
        self.__log.error('Need to implement IP Translation entry :: How it got called?')
        #self.__ip_trans_table.adjustTableDueDeadNode(dead_node_id)
        
    
    
    def killJob(self, job_id):
        """This function kills a job and all its tasks running at this NodeTracker by removing all tasks
        from the running tasks list and the health ping list
        """
        
        if job_id in self.running_map_tasks:
            #decrement the current task capacity on this node_tracker
            self.task_vacancy += len(self.running_map_tasks[job_id])
            
            #del allraise NotImplementedError('Update IP Translation table for dead nodes needs to be implemented') tasks of a job from the running tasks list
            del self.running_map_tasks[job_id]
        
        if job_id in self.running_reduce_tasks:
            #decrement the current task capacity on this node_tracker
            self.task_vacancy += len(self.running_reduce_tasks[job_id])
            
            #del all tasks of a job from the running tasks list
            del self.running_reduce_tasks[job_id]
                
                
        if job_id in self.tasks_last_ping:                
            #del all tasks from the last ping of tasks dict
            del self.tasks_last_ping[job_id]
        
        if job_id in self.complete_map_tasks:
            #del all tasks from the completed tasks list
            del self.complete_map_tasks[job_id]
            
        if job_id in self.complete_reduce_tasks:
            #del all tasks from the completed tasks list
            del self.complete_reduce_tasks[job_id]
        
        if job_id in self.failed_map_tasks:
            #del all tasks from the failed tasks dict
            del self.failed_map_tasks[job_id]
            
        if job_id in self.failed_reduce_tasks:
            #del all tasks from the failed tasks dict
            del self.failed_reduce_tasks[job_id]             
                       
    
    def killTask(self, job_id, map_or_reduce, task_id):
        """This function kills a task by removing it from the running tasks list
        and the health ping list
        """
        self.__log.info("Kill %s task %d request to NT for Job Id %d", map_or_reduce, task_id, job_id)
        
        # for maps
        if map_or_reduce == MAP:
            if job_id in self.running_map_tasks and task_id in self.running_map_tasks[job_id]:
                
                #del the killed task from the running tasks list
                del (self.running_map_tasks[job_id])[task_id]
            
                #if the task is the only one for this job_id at this NT then delete the Job_id entry
                if len(self.running_map_tasks[job_id]) == 0:
                    del self.running_map_tasks[job_id]
        else:
            if job_id in self.running_reduce_tasks and task_id in self.running_reduce_tasks[job_id]:
                
                #del the killed task from the running tasks list
                del (self.running_reduce_tasks[job_id])[task_id]
            
                #if the task is the only one for this job_id at this NT then delete the Job_id entry
                if len(self.running_reduce_tasks[job_id]) == 0:
                    del self.running_reduce_tasks[job_id]
        
        
        # TODO: tasks_last_ping also need to bifurcated to maps and reduces        
        if job_id in self.tasks_last_ping and task_id in self.tasks_last_ping[job_id]:
            #del the killed task from the last ping of tasks dict
            del (self.tasks_last_ping[job_id])[task_id]
        
        #decrement the current task capacity on this node_tracker
        self.task_vacancy += 1
        
        raise   
    
    
    def startNewReduceTask(self, newreducetaskaction):
        """ This function will add all reduce task for a job from awaiting tasks dict to 
        the running tasks dictionary as the shuffle phase for a job complete
        """
        
        #a new reduceTIPInfo object is created
        reducetip_info = ReduceTIPInfo(newreducetaskaction.job_id,
                                       newreducetaskaction.task_id,
                                       newreducetaskaction.input_data,
                                       newreducetaskaction.reduce_level,
                                       newreducetaskaction.struct_id)
                                     
        #logging the reduce task initated
        self.__log.info("A Reduce task %d initiated with Job Id %d", reducetip_info.task_id, reducetip_info.job_id)
                                         
        # a new ReduceTIP object is created to be added to the running tasks dict
        reducetip = ReduceTIP(self, reducetip_info, newreducetaskaction.reduce_level, newreducetaskaction.struct_id)
        self.threads.append(reducetip)
        
        #add the ReduceTIP to the running tasks dict
        if(reducetip_info.job_id in self.running_reduce_tasks):
            (self.running_reduce_tasks[reducetip_info.job_id])[reducetip_info.task_id] = [reducetip, RLock()]
        else:
            self.running_reduce_tasks[reducetip_info.job_id]= {reducetip_info.task_id: [reducetip,RLock()]}
            self.jobs.append(reducetip_info.job_id)
        
        ping_id = reducetip_info.returnTaskID()
        
        #add the ReduceTIP to the tasks_last_ping dict
        if(reducetip_info.job_id in self.tasks_last_ping):
            (self.tasks_last_ping [reducetip_info.job_id])[ping_id] = [reducetip_info.map_or_reduce,time.time(),RLock()]
        else:
            self.tasks_last_ping[reducetip_info.job_id]= {ping_id: [reducetip_info.map_or_reduce,time.time(),RLock()]}
        
        #the launch task func called of the TaskInProgress object
        (self.running_reduce_tasks[reducetip_info.job_id])[reducetip_info.task_id][0].start()
        
        #increment the current task capacity on this node_tracker
        self.task_vacancy -= 1
    
                       
    def taskComplete(self, task_info):
        
        """func is called from within a task running in a separate process; when a 
        task completes; this task is removed from the running tasks dict and 
        added to the completed tasks dict
        """
        
        with self.__lock:
            job_id = task_info.job_id
            task_id = task_info.task_id
            m_or_r = task_info.map_or_reduce
            
            self.__log.debug('NT got a notification from an %s task %d of job-id %d for completion', m_or_r, task_id, job_id)
            
            # generate ping id
            ping_id = task_info.returnTaskID()
            
            if job_id in self.tasks_last_ping and ping_id in self.tasks_last_ping[job_id]:
                # remove task off the health ping list
                del (self.tasks_last_ping[job_id])[ping_id]
            
            if m_or_r == MAP:
                if job_id not in self.complete_map_tasks:
                    self.complete_map_tasks[job_id] = {}
                
                if job_id in self.running_map_tasks:
                    if task_id in self.running_map_tasks[job_id]:
                        # move task from running to completed tasks
                        (self.complete_map_tasks[job_id])[task_id] = (self.running_map_tasks[job_id]).pop(task_id)
                
                completedTask = CompletedTaskInfo(task_id, job_id, m_or_r, ((self.complete_map_tasks[job_id])[task_id])[0].struct_id)
                
            else:
                if job_id not in self.complete_reduce_tasks:
                    self.complete_reduce_tasks[job_id] = {}
                    
                if job_id in self.running_reduce_tasks:
                    if task_id in self.running_reduce_tasks[job_id]:
                        #move task from running to completed tasks
                        (self.complete_reduce_tasks[job_id])[task_id] = (self.running_reduce_tasks[job_id]).pop(task_id)
                        
                completedTask = CompletedTaskInfo(task_id, job_id, m_or_r, ((self.complete_reduce_tasks[job_id])[task_id])[0].struct_id,((self.complete_reduce_tasks[job_id])[task_id])[0].reduce_level)
            
            self.info_objects_list.append(completedTask)
            
            #increment the current task capacity on this node_tracker
            self.task_vacancy += 1

    
    def taskFailed(self, job_id, task_id, map_or_reduce):
        
        """func is called from within a task running in a separate process; when a 
        task fails; this task is removed from the running tasks dict and 
        added to the failed tasks dict
        """
        
        self.__log.info("Failed %s task %d request to NT for Job Id %d", map_or_reduce, task_id, job_id)
        
        failedTask = FailedTaskInfo(task_id, job_id, map_or_reduce)
        
        self.info_objects_list.append(failedTask)
               
        #increment the current task capacity on this node_tracker
        self.task_vacancy += 1
            
        
    def healthPingFromTask(self, job_id, task_id, m_or_r):
        """func is called from within a task running in a separate process; to update 
        its health status with the NodeTracker
        """
        with self.__lock:
            #refresh the last ping time for a task if this task_id exists and has not been
            #killed
            ping_tid = str(task_id) + '_' + m_or_r
            if job_id in self.tasks_last_ping and ping_tid in self.tasks_last_ping[job_id]:
                (self.tasks_last_ping[job_id])[ping_tid] = [time.time(),RLock()]
                       
            self.__log.debug("%s Task-id %d with job_id %d just pinged the NT", m_or_r, task_id, job_id) 
           
                
    def localizeJob(self, job_id):
        """Run for copying all essential files from the DFS to the local
        directory whenever this NT gets a job whose task it hasn't done 
        before. It also creates the relevant directories in the local FS
        """
        try:
            self.__log.info("Initiating / copying config files for Job id %d", job_id)
            
            # get consts
            input_path = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_input_temp_dir, job_id)
            output_path = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_output_temp_dir, job_id)
            compute_path = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_compute_temp_dir, job_id)
            
            # create directories
            if not os.path.isdir(input_path):
                os.makedirs(input_path)
            if not os.path.isdir(output_path):
                os.makedirs(output_path)
            if not os.path.isdir(compute_path):
                os.makedirs(compute_path)
            
            # copy job conf
            job_conf_local_dir = os.path.dirname(ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_input_temp_dir, job_id))
            job_conf_dfs_filepath = ma.const.JobsXmlData.get_dfs_filepath_str_data(ma.const.xml_dfs_path_job_conf, job_id)
            
            if self.hdfs.check_path(job_conf_dfs_filepath) == 1:
                # delete if already exists
                local_jobconf_filepath = job_conf_local_dir + os.sep + os.path.basename(job_conf_dfs_filepath)
                if os.path.isfile(local_jobconf_filepath):
                    os.remove(local_jobconf_filepath)
                
                self.hdfs.copy_file_to_local(job_conf_dfs_filepath, job_conf_local_dir, job_id, auto_retry=True)
            else:
                self.__log.error('Job Conf for job_id %d does not exist', job_id)
            
            # reinitialize the jobs XML data to remove all previous values
            ma.const.JobsXmlData.reinitialize(job_id)
            
        except Exception as err_msg:
            self.__log.error('Error while localizing new job: %s', err_msg) 


    def returnNoOfMaps(self, job_id=None):
        """This function returns the number of running map tasks, and the 
        number of completed mao tasks on this nodetracker. If the job_id given
        is None, it will give a total count, otherwise it will give job
        specific stats
        """
        
        no_of_running_maps = 0
        no_of_complete_maps = 0
        
        if job_id == None:
            # if need global maps
            for key in self.running_map_tasks:
                no_of_running_maps += len(self.running_map_tasks[key])
        
            for key in self.complete_map_tasks:
                no_of_complete_maps += len(self.complete_map_tasks[key])
        else:
            # if need job specific maps
            temp_list = self.running_map_tasks.get(job_id, [])
            no_of_running_maps += len(temp_list)
            
            temp_list = self.complete_map_tasks.get(job_id, [])
            no_of_complete_maps += len(temp_list)
            
        return no_of_running_maps, no_of_complete_maps


    def returnNoOfReduces(self, job_id=None):
        """This function returns the number of running reduce tasks, and the 
        number of completed reduce tasks on this nodetracker. If the job_id
        given is None, it will give a total count, otherwise it will give job
        specific stats
        """
        
        no_of_running_reduces = 0
        no_of_complete_reduces = 0
        
        if job_id == None:
            # if need global reduces
            for key in self.running_reduce_tasks:
                no_of_running_reduces += len(self.running_reduce_tasks[key])
                
            for key in self.complete_reduce_tasks:
                no_of_complete_reduces += len(self.complete_reduce_tasks[key])
        else:
            # if need job specific reduces
            temp_list = self.running_reduce_tasks.get(job_id, [])
            no_of_running_reduces += len(temp_list)
            
            temp_list = self.complete_reduce_tasks.get(job_id, [])
            no_of_complete_reduces += len(temp_list)

        return no_of_running_reduces, no_of_complete_reduces



def main(argv=None):
    
    print('--- Starting up MR+ NodeTracker compute engine ---')
    
    if len(sys.argv) == 2:
        forced_job_id = int(sys.argv[1])
        print('--> Forced job id', forced_job_id)
        nodetracker = NodeTracker(1, forced_job_id)
    else:
        nodetracker = NodeTracker(1)
    
    nodetracker.start()
    nodetracker.threads.append(nodetracker)
    for th in nodetracker.threads:
        th.join()      
        
    #nodetracker.threads.append(nodetracker)
    
         
if __name__ == "__main__":

    sys.exit(main())    
        
    